package com.atguigu.cdc;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.stream.Stream;

/*自定义的反序列化器。目的是为了从本来的SourceRecord对象，转换为Json格式的对象
{
    "database":" ", 数据库名
    "table":"",  表
    "type":"", 操作类型
    "data"{} 数据

 }*/
public class FlinkCDC_CustomSchema {
    //设置输入输入流环境
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //读取输入环境
        SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("hadoop104")
                .port(3306)
                //可以监控多个数据库.
                .databaseList("gmall1021_realtime") // monitor all tables under inventory database
                //因为可以输入的多个数据库中可能有表明重复，所以表前需要+数据库名.
                .tableList("gmall1021_realtime.t_user")
                .username("root")
                .password("000000")
                //initial 当启动时都数据库，可以读历史数据
                //earliest 从binlog开始读
                //latest //从binlog末尾读.
                .startupOptions(StartupOptions.initial())
                .deserializer(new MySchema()) // converts SourceRecord to String
                .build();

        env
                .addSource(sourceFunction).print();


                        env.execute();
    }
    public static  class MySchema implements DebeziumDeserializationSchema<String> {

        //TODO 类型为以下格式.
        //*******************************************************************************************************************
       /* SourceRecord{
            sourcePartition={server=mysql_binlog_source},
                    sourceOffset={file=mysql-bin.000011, pos=154, row=1, snapshot=true}
        }
        ConnectRecord{
            topic='mysql_binlog_source.gmall1021_realtime.t_user', kafkaPartition=null, key=null, keySchema=null,
                    value=Struct{
                after=Struct{id=1,name=ww},source=Struct{version=1.4.1.Final,
                        connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall1021_realtime,table=t_user,server_id=0,
                        file=mysql-bin.000011,pos=154,row=0},op=c,ts_ms=1618720954347},
            valueSchema=Schema{
                mysql_binlog_source.gmall1021_realtime.t_user.Envelope:STRUCT},
            timestamp=null, headers=ConnectHeaders(headers=)
        }*/
        //*******************************************************************************************************************

        //进行反序列化
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
            //定义一个json对象，封装全部，database,table type,data
            JSONObject resultJson=new JSONObject();
            //System.out.println(sourceRecord.topic());
            String topic = sourceRecord.topic();
            String[] topicArr= topic.split("\\.");//对.进行转义
            //System.out.println("databases:"+topicArr[1]+"table:"+topicArr[1]);
            //1.获取数据库和表
            //这也是一种获取数据库和表明的方法.
           /* String dbName = valueStruct.getStruct("source").getString("db");
            String tableName = valueStruct.getStruct("source").getString("table");*/

            String dbName = topicArr[1];
            String tableName = topicArr[2];
            //2.获取当前对象的操作类型(c,r,u,d)
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String type = operation.toString().toLowerCase();
            //System.out.println(operation.toString()); //CREATE,DELETE
            //3.获取数据
           Struct valueStruct = (Struct) sourceRecord.value();
            //System.out.println(valueStruct);
            Struct afterStruct = valueStruct.getStruct("after");
            //TODO 需要将data数据封装成一个json对象
            JSONObject dataJsonObject=new JSONObject();
            if(afterStruct != null){
                //拿到属性
                for(Field field: afterStruct.schema().fields()){
                    //属性名
                    String filedName = field.name();
                    //属性值
                    Object filedValue = afterStruct.get(filedName);
                    dataJsonObject.put(filedName,filedValue);
                }
            }


            //struct格式
            //*******************************************************************************************************************
           /* Struct{
                after=Struct{
                    id=1,name=ww
                },
                source=Struct{
                    version=1.4.1.Final,
                            connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall1021_realtime,
                            table=t_user,server_id=0,file=mysql-bin.000011,pos=1063,row=0
                },
                op=c,ts_ms=1618723898862}*/
            //*******************************************************************************************************************



            //System.out.println(valueStruct.getStruct("source").getString("db"));
            //System.out.println(valueStruct.getStruct("source").getString("table"));
            //System.out.println("database:"+dbName+",table:"+tableName+",操作类型："+type);
            //System.out.println(dataJsonObject);
            resultJson.put("databases",dbName);
            resultJson.put("table",tableName);
            resultJson.put("type",type);
            resultJson.put("data",dataJsonObject);
            //将反序列化结果向外输出
            collector.collect(resultJson.toJSONString());


        }

        //获取类型
        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
    }

}
